Skip to content
Snippets Groups Projects
Commit d3ef6933 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9978 from zsxwing/cached-threadpool.
parent 068b6438
No related branches found
No related tags found
No related merge requests found
......@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = {
def newDaemonCachedThreadPool(
prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
new ThreadPoolExecutor(
0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory)
val threadPool = new ThreadPoolExecutor(
maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable],
threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
}
/**
......
......@@ -24,6 +24,8 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random
import org.scalatest.concurrent.Eventually._
import org.apache.spark.SparkFunSuite
class ThreadUtilsSuite extends SparkFunSuite {
......@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
}
}
test("newDaemonCachedThreadPool") {
val maxThreadNumber = 10
val startThreadsLatch = new CountDownLatch(maxThreadNumber)
val latch = new CountDownLatch(1)
val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"ThreadUtilsSuite-newDaemonCachedThreadPool",
maxThreadNumber,
keepAliveSeconds = 2)
try {
for (_ <- 1 to maxThreadNumber) {
cachedThreadPool.execute(new Runnable {
override def run(): Unit = {
startThreadsLatch.countDown()
latch.await(10, TimeUnit.SECONDS)
}
})
}
startThreadsLatch.await(10, TimeUnit.SECONDS)
assert(cachedThreadPool.getActiveCount === maxThreadNumber)
assert(cachedThreadPool.getQueue.size === 0)
// Submit a new task and it should be put into the queue since the thread number reaches the
// limitation
cachedThreadPool.execute(new Runnable {
override def run(): Unit = {
latch.await(10, TimeUnit.SECONDS)
}
})
assert(cachedThreadPool.getActiveCount === maxThreadNumber)
assert(cachedThreadPool.getQueue.size === 1)
latch.countDown()
eventually(timeout(10.seconds)) {
// All threads should be stopped after keepAliveSeconds
assert(cachedThreadPool.getActiveCount === 0)
assert(cachedThreadPool.getPoolSize === 0)
}
} finally {
cachedThreadPool.shutdownNow()
}
}
test("sameThread") {
val callerThreadName = Thread.currentThread().getName()
val f = Future {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment