Skip to content
Snippets Groups Projects
Commit 4bba10c4 authored by Kousuke Saruta's avatar Kousuke Saruta Committed by Andrew Or
Browse files

[SPARK-3233] Executor never stop its SparnEnv, BlockManager, ConnectionManager etc.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #2138 from sarutak/SPARK-3233 and squashes the following commits:

c0205b7 [Kousuke Saruta] Merge branch 'SPARK-3233' of github.com:sarutak/spark into SPARK-3233
064679d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
d3005fd [Kousuke Saruta] Modified Class definition format of BlockManagerMaster
039b747 [Kousuke Saruta] Modified style
889e2d1 [Kousuke Saruta] Modified BlockManagerMaster to be able to be past isDriver flag
4da8535 [Kousuke Saruta] Modified BlockManagerMaster#stop to send StopBlockManagerMaster message when sender is Driver
6518c3a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
d5ab19a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
6bce25c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
6058a58 [Kousuke Saruta] Modified Executor not to invoke SparkEnv#stop in local mode
e5ad9d3 [Kousuke Saruta] Modified Executor to stop SparnEnv at the end of itself
parent e08ea739
No related branches found
No related tags found
No related merge requests found
......@@ -225,7 +225,7 @@ object SparkEnv extends Logging {
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
......
......@@ -123,6 +123,9 @@ private[spark] class Executor(
env.metricsSystem.report()
isStopped = true
threadPool.shutdown()
if (!isLocal) {
env.stop()
}
}
class TaskRunner(
......
......@@ -27,7 +27,11 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging {
class BlockManagerMaster(
var driverActor: ActorRef,
conf: SparkConf,
isDriver: Boolean)
extends Logging {
private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf)
private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf)
......@@ -196,7 +200,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
/** Stop the driver actor, called only on the Spark driver node */
def stop() {
if (driverActor != null) {
if (driverActor != null && isDriver) {
tell(StopBlockManagerMaster)
driverActor = null
logInfo("BlockManagerMaster stopped")
......
......@@ -99,7 +99,7 @@ private[spark] object ThreadingTest {
val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
conf)
conf, true)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf))
......
......@@ -120,7 +120,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
*/
val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
// stub out BlockManagerMaster.getLocations to use our cacheLocations
val blockManagerMaster = new BlockManagerMaster(null, conf) {
val blockManagerMaster = new BlockManagerMaster(null, conf, true) {
override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
......
......@@ -93,7 +93,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
master = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
conf)
conf, true)
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment