Skip to content
Snippets Groups Projects
Commit 8af1bf10 authored by Cheng Lian's avatar Cheng Lian
Browse files

[SPARK-7842] [SQL] Makes task committing/aborting in InsertIntoHadoopFsRelation more robust

When committing/aborting a write task issued in `InsertIntoHadoopFsRelation`, if an exception is thrown from `OutputWriter.close()`, the committing/aborting process will be interrupted, and leaves messy stuff behind (e.g., the `_temporary` directory created by `FileOutputCommitter`).

This PR makes these two process more robust by catching potential exceptions and falling back to normal task committment/abort.

Author: Cheng Lian <lian@databricks.com>

Closes #6378 from liancheng/spark-7838 and squashes the following commits:

f18253a [Cheng Lian] Makes task committing/aborting in InsertIntoHadoopFsRelation more robust
parent bfeedc69
No related branches found
No related tags found
No related merge requests found
...@@ -377,13 +377,22 @@ private[sql] class DefaultWriterContainer( ...@@ -377,13 +377,22 @@ private[sql] class DefaultWriterContainer(
override def outputWriterForRow(row: Row): OutputWriter = writer override def outputWriterForRow(row: Row): OutputWriter = writer
override def commitTask(): Unit = { override def commitTask(): Unit = {
writer.close() try {
super.commitTask() writer.close()
super.commitTask()
} catch {
case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
} }
override def abortTask(): Unit = { override def abortTask(): Unit = {
writer.close() try {
super.abortTask() writer.close()
} finally {
super.abortTask()
}
} }
} }
...@@ -422,13 +431,21 @@ private[sql] class DynamicPartitionWriterContainer( ...@@ -422,13 +431,21 @@ private[sql] class DynamicPartitionWriterContainer(
} }
override def commitTask(): Unit = { override def commitTask(): Unit = {
outputWriters.values.foreach(_.close()) try {
super.commitTask() outputWriters.values.foreach(_.close())
super.commitTask()
} catch { case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
} }
override def abortTask(): Unit = { override def abortTask(): Unit = {
outputWriters.values.foreach(_.close()) try {
super.abortTask() outputWriters.values.foreach(_.close())
} finally {
super.abortTask()
}
} }
} }
......
...@@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} ...@@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.{Row, SQLContext}
/** /**
...@@ -67,7 +67,9 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends ...@@ -67,7 +67,9 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends
recordWriter.write(null, new Text(serialized)) recordWriter.write(null, new Text(serialized))
} }
override def close(): Unit = recordWriter.close(context) override def close(): Unit = {
recordWriter.close(context)
}
} }
/** /**
...@@ -120,3 +122,39 @@ class SimpleTextRelation( ...@@ -120,3 +122,39 @@ class SimpleTextRelation(
} }
} }
} }
/**
* A simple example [[HadoopFsRelationProvider]].
*/
class CommitFailureTestSource extends HadoopFsRelationProvider {
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
new CommitFailureTestRelation(paths, schema, partitionColumns, parameters)(sqlContext)
}
}
class CommitFailureTestRelation(
override val paths: Array[String],
maybeDataSchema: Option[StructType],
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient sqlContext: SQLContext)
extends SimpleTextRelation(
paths, maybeDataSchema, userDefinedPartitionColumns, parameters)(sqlContext) {
override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new SimpleTextOutputWriter(path, context) {
override def close(): Unit = {
sys.error("Intentional task commitment failure for testing purpose.")
}
}
}
}
}
...@@ -18,7 +18,9 @@ ...@@ -18,7 +18,9 @@
package org.apache.spark.sql.sources package org.apache.spark.sql.sources
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.scalatest.FunSuite
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive
...@@ -477,6 +479,26 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { ...@@ -477,6 +479,26 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
} }
} }
class CommitFailureTestRelationSuite extends FunSuite with SQLTestUtils {
import TestHive.implicits._
override val sqlContext = TestHive
val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName
test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
withTempPath { file =>
val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
intercept[SparkException] {
df.write.format(dataSourceName).save(file.getCanonicalPath)
}
val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
}
}
}
class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment