Skip to content
Snippets Groups Projects
Commit 1cafc76e authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Reynold Xin
Browse files

[SPARK-18774][CORE][SQL] Ignore non-existing files when ignoreCorruptFiles is enabled (branch 2.1)

## What changes were proposed in this pull request?

Backport #16203 to branch 2.1.

## How was this patch tested?

Jennkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16216 from zsxwing/SPARK-18774-2.1.
parent fcd22e53
No related branches found
No related tags found
No related merge requests found
......@@ -203,7 +203,8 @@ package object config {
private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
"encountering corrupt files and contents that have been read will still be returned.")
"encountering corrupted or non-existing files and contents that have been read will still " +
"be returned.")
.booleanConf
.createWithDefault(false)
......
......@@ -210,12 +210,12 @@ class HadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
private val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
private val jobConf = getJobConf()
val inputMetrics = context.taskMetrics().inputMetrics
val existingBytesRead = inputMetrics.bytesRead
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.inputSplit.value match {
......@@ -225,7 +225,7 @@ class HadoopRDD[K, V](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
......@@ -235,23 +235,31 @@ class HadoopRDD[K, V](
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
reader =
try {
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
} catch {
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
override def getNext(): (K, V) = {
try {
......
......@@ -132,12 +132,12 @@ class NewHadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
private val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf
private val conf = getConf
val inputMetrics = context.taskMetrics().inputMetrics
val existingBytesRead = inputMetrics.bytesRead
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.serializableHadoopSplit.value match {
......@@ -147,39 +147,51 @@ class NewHadoopRDD[K, V](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
private val getBytesReadCallback: Option[() => Long] =
split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
val format = inputFormatClass.newInstance
private val format = inputFormatClass.newInstance
format match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
private var finished = false
private var reader =
try {
val _reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
_reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
_reader
} catch {
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
e)
finished = true
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
var havePair = false
var finished = false
var recordsSinceMetricsUpdate = 0
private var havePair = false
private var recordsSinceMetricsUpdate = 0
override def hasNext: Boolean = {
if (!finished && !havePair) {
......
......@@ -150,6 +150,9 @@ class FileScanRDD(
currentIterator = readFunction(currentFile)
}
} catch {
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
currentIterator = Iterator.empty
case e: java.io.FileNotFoundException =>
throw new java.io.FileNotFoundException(
e.getMessage + "\n" +
......
......@@ -606,7 +606,8 @@ object SQLConf {
val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
"encountering corrupt files and contents that have been read will still be returned.")
"encountering corrupted or non-existing and contents that have been read will still be " +
"returned.")
.booleanConf
.createWithDefault(false)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment