Skip to content
Snippets Groups Projects
Commit 136f687c authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18477][SS] Enable interrupts for HDFS in HDFSMetadataLog


## What changes were proposed in this pull request?

HDFS `write` may just hang until timeout if some network error happens. It's better to enable interrupts to allow stopping the query fast on HDFS.

This PR just changes the logic to only disable interrupts for local file system, as HADOOP-10622 only happens for local file system.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15911 from zsxwing/interrupt-on-dfs.

(cherry picked from commit e5f5c29e)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent 6717981e
No related branches found
No related tags found
No related merge requests found
......@@ -105,25 +105,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
/**
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
* metadata has already been stored, this method will return `false`.
*
* Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]]
* so that interrupts can be disabled while writing the batch file. This is because there is a
* potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread
* running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our
* case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the
* file permissions, and can get deadlocked if the stream execution thread is stopped by
* interrupt. Hence, we make sure that this method is called on [[UninterruptibleThread]] which
* allows us to disable interrupts here. Also see SPARK-14131.
*/
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
if (fileManager.isLocalFileSystem) {
Thread.currentThread match {
case ut: UninterruptibleThread =>
// When using a local file system, "writeBatch" must be called on a
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
// while writing the batch file. This is because there is a potential dead-lock in
// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
// the file permission if using the local file system, and can get deadlocked if the
// stream execution thread is stopped by interrupt. Hence, we make sure that
// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
// interrupts here. Also see SPARK-14131.
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() on a local file system must be executed on " +
"a o.a.spark.util.UninterruptibleThread")
}
} else {
// For a distributed file system, such as HDFS or S3, if the network is broken, write
// operations may just hang until timeout. We should enable interrupts to allow stopping
// the query fast.
writeBatch(batchId, metadata, serialize)
}
true
}
......@@ -298,6 +307,9 @@ object HDFSMetadataLog {
/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
def delete(path: Path): Unit
/** Whether the file systme is a local FS. */
def isLocalFileSystem: Boolean
}
/**
......@@ -342,6 +354,13 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}
override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
case _: local.LocalFs | _: local.RawLocalFs =>
// LocalFs = RawLocalFs + ChecksumFs
true
case _ => false
}
}
/**
......@@ -398,5 +417,12 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}
override def isLocalFileSystem: Boolean = fs match {
case _: LocalFileSystem | _: RawLocalFileSystem =>
// LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
true
case _ => false
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment