Skip to content
Snippets Groups Projects
Commit 9f742241 authored by zsxwing's avatar zsxwing Committed by Andrew Or
Browse files

[SPARK-6602] [CORE] Remove some places in core that calling SparkEnv.actorSystem

Author: zsxwing <zsxwing@gmail.com>

Closes #6333 from zsxwing/remove-actor-system-usage and squashes the following commits:

f125aa6 [zsxwing] Fix YarnAllocatorSuite
ceadcf6 [zsxwing] Change the "port" parameter type of "AkkaUtils.address" to "int"; update ApplicationMaster and YarnAllocator to get the driverUrl from RpcEnv
3239380 [zsxwing] Remove some places in core that calling SparkEnv.actorSystem
parent 2e9a5f22
No related branches found
No related tags found
No related merge requests found
...@@ -19,9 +19,9 @@ package org.apache.spark.scheduler ...@@ -19,9 +19,9 @@ package org.apache.spark.scheduler
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.{TimerTask, Timer} import java.util.{TimerTask, Timer}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet import scala.collection.mutable.HashSet
...@@ -32,7 +32,7 @@ import org.apache.spark._ ...@@ -32,7 +32,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.util.Utils import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.executor.TaskMetrics import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId import org.apache.spark.storage.BlockManagerId
...@@ -64,6 +64,9 @@ private[spark] class TaskSchedulerImpl( ...@@ -64,6 +64,9 @@ private[spark] class TaskSchedulerImpl(
// How often to check for speculative tasks // How often to check for speculative tasks
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
// Threshold above which we warn user initial TaskSet may be starved // Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
...@@ -142,10 +145,11 @@ private[spark] class TaskSchedulerImpl( ...@@ -142,10 +145,11 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.getBoolean("spark.speculation", false)) { if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread") logInfo("Starting speculative execution thread")
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds, speculationScheduler.scheduleAtFixedRate(new Runnable {
SPECULATION_INTERVAL_MS milliseconds) { override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } checkSpeculatableTasks()
}(sc.env.actorSystem.dispatcher) }
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
} }
} }
...@@ -412,6 +416,7 @@ private[spark] class TaskSchedulerImpl( ...@@ -412,6 +416,7 @@ private[spark] class TaskSchedulerImpl(
} }
override def stop() { override def stop() {
speculationScheduler.shutdown()
if (backend != null) { if (backend != null) {
backend.stop() backend.stop()
} }
......
...@@ -25,9 +25,10 @@ import scala.collection.mutable.{HashMap, HashSet} ...@@ -25,9 +25,10 @@ import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _} import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{AkkaUtils, Utils} import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/** /**
...@@ -115,11 +116,9 @@ private[spark] class CoarseMesosSchedulerBackend( ...@@ -115,11 +116,9 @@ private[spark] class CoarseMesosSchedulerBackend(
} }
val command = CommandInfo.newBuilder() val command = CommandInfo.newBuilder()
.setEnvironment(environment) .setEnvironment(environment)
val driverUrl = AkkaUtils.address( val driverUrl = sc.env.rpcEnv.uriOf(
AkkaUtils.protocol(sc.env.actorSystem),
SparkEnv.driverActorSystemName, SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"), RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME) CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val uri = conf.getOption("spark.executor.uri") val uri = conf.getOption("spark.executor.uri")
......
...@@ -235,7 +235,7 @@ private[spark] object AkkaUtils extends Logging { ...@@ -235,7 +235,7 @@ private[spark] object AkkaUtils extends Logging {
protocol: String, protocol: String,
systemName: String, systemName: String,
host: String, host: String,
port: Any, port: Int,
actorName: String): String = { actorName: String): String = {
s"$protocol://$systemName@$host:$port/user/$actorName" s"$protocol://$systemName@$host:$port/user/$actorName"
} }
......
...@@ -34,7 +34,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar ...@@ -34,7 +34,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
import org.apache.spark.SparkException import org.apache.spark.SparkException
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil} import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util._ import org.apache.spark.util._
...@@ -220,7 +220,7 @@ private[spark] class ApplicationMaster( ...@@ -220,7 +220,7 @@ private[spark] class ApplicationMaster(
sparkContextRef.compareAndSet(sc, null) sparkContextRef.compareAndSet(sc, null)
} }
private def registerAM(uiAddress: String, securityMgr: SecurityManager) = { private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: SecurityManager) = {
val sc = sparkContextRef.get() val sc = sparkContextRef.get()
val appId = client.getAttemptId().getApplicationId().toString() val appId = client.getAttemptId().getApplicationId().toString()
...@@ -231,8 +231,14 @@ private[spark] class ApplicationMaster( ...@@ -231,8 +231,14 @@ private[spark] class ApplicationMaster(
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" } .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("") .getOrElse("")
allocator = client.register(yarnConf, val _sparkConf = if (sc != null) sc.getConf else sparkConf
if (sc != null) sc.getConf else sparkConf, val driverUrl = _rpcEnv.uriOf(
SparkEnv.driverActorSystemName,
RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
allocator = client.register(driverUrl,
yarnConf,
_sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(), if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress, uiAddress,
historyAddress, historyAddress,
...@@ -279,7 +285,7 @@ private[spark] class ApplicationMaster( ...@@ -279,7 +285,7 @@ private[spark] class ApplicationMaster(
sc.getConf.get("spark.driver.host"), sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"), sc.getConf.get("spark.driver.port"),
isClusterMode = true) isClusterMode = true)
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join() userClassThread.join()
} }
} }
...@@ -289,7 +295,7 @@ private[spark] class ApplicationMaster( ...@@ -289,7 +295,7 @@ private[spark] class ApplicationMaster(
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr) rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)
waitForSparkDriver() waitForSparkDriver()
addAmIpFilter() addAmIpFilter()
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
// In client mode the actor will stop the reporter thread. // In client mode the actor will stop the reporter thread.
reporterThread.join() reporterThread.join()
......
...@@ -34,10 +34,8 @@ import org.apache.hadoop.yarn.util.RackResolver ...@@ -34,10 +34,8 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf} import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.AkkaUtils
/** /**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
...@@ -53,6 +51,7 @@ import org.apache.spark.util.AkkaUtils ...@@ -53,6 +51,7 @@ import org.apache.spark.util.AkkaUtils
* synchronized. * synchronized.
*/ */
private[yarn] class YarnAllocator( private[yarn] class YarnAllocator(
driverUrl: String,
conf: Configuration, conf: Configuration,
sparkConf: SparkConf, sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest], amClient: AMRMClient[ContainerRequest],
...@@ -107,13 +106,6 @@ private[yarn] class YarnAllocator( ...@@ -107,13 +106,6 @@ private[yarn] class YarnAllocator(
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build()) new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
launcherPool.allowCoreThreadTimeOut(true) launcherPool.allowCoreThreadTimeOut(true)
private val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(securityMgr.akkaSSLOptions.enabled),
SparkEnv.driverActorSystemName,
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
// For testing // For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true) private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
......
...@@ -55,6 +55,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg ...@@ -55,6 +55,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
* @param uiHistoryAddress Address of the application on the History Server. * @param uiHistoryAddress Address of the application on the History Server.
*/ */
def register( def register(
driverUrl: String,
conf: YarnConfiguration, conf: YarnConfiguration,
sparkConf: SparkConf, sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]], preferredNodeLocations: Map[String, Set[SplitInfo]],
...@@ -72,7 +73,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg ...@@ -72,7 +73,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true registered = true
} }
new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr) new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
} }
/** /**
......
...@@ -90,6 +90,7 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach ...@@ -90,6 +90,7 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
"--jar", "somejar.jar", "--jar", "somejar.jar",
"--class", "SomeClass") "--class", "SomeClass")
new YarnAllocator( new YarnAllocator(
"not used",
conf, conf,
sparkConf, sparkConf,
rmClient, rmClient,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment