Skip to content
Snippets Groups Projects
Commit 01c999e7 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-20461][CORE][SS] Use UninterruptibleThread for Executor and fix the...

[SPARK-20461][CORE][SS] Use UninterruptibleThread for Executor and fix the potential hang in CachedKafkaConsumer

## What changes were proposed in this pull request?

This PR changes Executor's threads to `UninterruptibleThread` so that we can use `runUninterruptibly` in `CachedKafkaConsumer`. However, this is just best effort to avoid hanging forever. If the user uses`CachedKafkaConsumer` in another thread (e.g., create a new thread or Future), the potential hang may still happen.

## How was this patch tested?

The new added test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17761 from zsxwing/int.
parent 606432a1
No related branches found
No related tags found
No related merge requests found
......@@ -23,13 +23,15 @@ import java.lang.management.ManagementFactory
import java.net.{URI, URL}
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent._
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
......@@ -84,7 +86,20 @@ private[spark] class Executor(
}
// Start worker thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
override def newThread(r: Runnable): Thread =
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
// will hang forever if some methods are interrupted.
new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
})
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
private val executorSource = new ExecutorSource(threadPool, executorId)
// Pool used for threads that supervise task killing / cancellation
private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper")
......
......@@ -27,7 +27,13 @@ import javax.annotation.concurrent.GuardedBy
*
* Note: "runUninterruptibly" should be called only in `this` thread.
*/
private[spark] class UninterruptibleThread(name: String) extends Thread(name) {
private[spark] class UninterruptibleThread(
target: Runnable,
name: String) extends Thread(target, name) {
def this(name: String) {
this(null, name)
}
/** A monitor to protect "uninterruptible" and "interrupted" */
private val uninterruptibleLock = new Object
......
......@@ -44,6 +44,7 @@ import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.UninterruptibleThread
class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually {
......@@ -158,6 +159,18 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug
assert(failReason.isInstanceOf[FetchFailed])
}
test("Executor's worker threads should be UninterruptibleThread") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("executor thread test")
.set("spark.ui.enabled", "false")
sc = new SparkContext(conf)
val executorThread = sc.parallelize(Seq(1), 1).map { _ =>
Thread.currentThread.getClass.getName
}.collect().head
assert(executorThread === classOf[UninterruptibleThread].getName)
}
test("SPARK-19276: OOMs correctly handled with a FetchFailure") {
// when there is a fatal error like an OOM, we don't do normal fetch failure handling, since it
// may be a false positive. And we should call the uncaught exception handler.
......
......@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.util.UninterruptibleThread
/**
......@@ -62,11 +63,20 @@ private[kafka010] case class CachedKafkaConsumer private(
case class AvailableOffsetRange(earliest: Long, latest: Long)
private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly(body)
case _ =>
logWarning("CachedKafkaConsumer is not running in UninterruptibleThread. " +
"It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894")
body
}
/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
def getAvailableOffsetRange(): AvailableOffsetRange = {
def getAvailableOffsetRange(): AvailableOffsetRange = runUninterruptiblyIfPossible {
consumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = consumer.position(topicPartition)
consumer.seekToEnd(Set(topicPartition).asJava)
......@@ -92,7 +102,8 @@ private[kafka010] case class CachedKafkaConsumer private(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
failOnDataLoss: Boolean):
ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
require(offset < untilOffset,
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment