Skip to content
Snippets Groups Projects
Commit 874a2ca9 authored by zsxwing's avatar zsxwing Committed by Reynold Xin
Browse files

[SPARK-7174][Core] Move calling `TaskScheduler.executorHeartbeatReceived` to another thread

`HeartbeatReceiver` will call `TaskScheduler.executorHeartbeatReceived`, which is a blocking operation because `TaskScheduler.executorHeartbeatReceived` will call

```Scala
    blockManagerMaster.driverEndpoint.askWithReply[Boolean](
      BlockManagerHeartbeat(blockManagerId), 600 seconds)
```

finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g., the reply may be dispatched to the same thread of the ask operation. So the reply cannot be processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1.

jstack log:

```
"sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f2a8c02d000 nid=0x725 waiting on condition [0x00007f2b1d6d0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000006197a0868> (a scala.concurrent.impl.Promise$CompletionLatch)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
	at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
	at scala.concurrent.Await$.result(package.scala:107)
	at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
	at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
	at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

This PR moved this blocking operation to a separated thread.

Author: zsxwing <zsxwing@gmail.com>

Closes #5723 from zsxwing/SPARK-7174 and squashes the following commits:

98bfe48 [zsxwing] Use a single thread for checking timeout and reporting executorHeartbeatReceived
5b3b545 [zsxwing] Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking the Akka thread pool
parent 4d9e560b
No related branches found
No related tags found
No related merge requests found
...@@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext) ...@@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
private var timeoutCheckingTask: ScheduledFuture[_] = null private var timeoutCheckingTask: ScheduledFuture[_] = null
private val timeoutCheckingThread = // "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread") // block the thread for a long time.
private val eventLoopThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread") private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
override def onStart(): Unit = { override def onStart(): Unit = {
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable { timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError { override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ExpireDeadHosts)) Option(self).foreach(_.send(ExpireDeadHosts))
} }
...@@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext) ...@@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) { if (scheduler != null) {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis() executorLastSeen(executorId) = System.currentTimeMillis()
context.reply(response) eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
})
} else { } else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this // Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to // case rarely happens. However, if it really happens, log it and ask the executor to
...@@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext) ...@@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
if (sc.supportDynamicAllocation) { if (sc.supportDynamicAllocation) {
// Asynchronously kill the executor to avoid blocking the current thread // Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable { killExecutorThread.submit(new Runnable {
override def run(): Unit = sc.killExecutor(executorId) override def run(): Unit = Utils.tryLogNonFatalError {
sc.killExecutor(executorId)
}
}) })
} }
executorLastSeen.remove(executorId) executorLastSeen.remove(executorId)
...@@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext) ...@@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
if (timeoutCheckingTask != null) { if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel(true) timeoutCheckingTask.cancel(true)
} }
timeoutCheckingThread.shutdownNow() eventLoopThread.shutdownNow()
killExecutorThread.shutdownNow() killExecutorThread.shutdownNow()
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment