Skip to content
Snippets Groups Projects
Commit 43706bf8 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-12608][STREAMING] Remove submitJobThreadPool since submitJob doesn't...

[SPARK-12608][STREAMING] Remove submitJobThreadPool since submitJob doesn't create a separate thread to wait for the job result

Before #9264, submitJob would create a separate thread to wait for the job result. `submitJobThreadPool` was a workaround in `ReceiverTracker` to run these waiting-job-result threads. Now #9264 has been merged to master and resolved this blocking issue, `submitJobThreadPool` can be removed now.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10560 from zsxwing/remove-submitJobThreadPool.
parent b504b6a9
No related branches found
No related tags found
No related merge requests found
......@@ -435,10 +435,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** RpcEndpoint to receive messages from the receivers. */
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
// TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
private val submitJobThreadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))
private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
......@@ -610,12 +606,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
}(ThreadUtils.sameThread)
logInfo(s"Receiver ${receiver.streamId} started")
}
override def onStop(): Unit = {
submitJobThreadPool.shutdownNow()
active = false
walBatchingThreadPool.shutdown()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment