Skip to content
Snippets Groups Projects
Commit aa036451 authored by jerryshao's avatar jerryshao Committed by Marcelo Vanzin
Browse files

[SPARK-12447][YARN] Only update the states when executor is successfully launched

The details is described in https://issues.apache.org/jira/browse/SPARK-12447.

vanzin Please help to review, thanks a lot.

Author: jerryshao <sshao@hortonworks.com>

Closes #10412 from jerryshao/SPARK-12447.
parent b0768538
No related branches found
No related tags found
No related merge requests found
......@@ -55,15 +55,14 @@ private[yarn] class ExecutorRunnable(
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource])
extends Runnable with Logging {
localResources: Map[String, LocalResource]) extends Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
lazy val env = prepareEnvironment(container)
override def run(): Unit = {
def run(): Unit = {
logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
......
......@@ -24,6 +24,7 @@ import java.util.regex.Pattern
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
......@@ -472,41 +473,58 @@ private[yarn] class YarnAllocator(
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
numExecutorsRunning += 1
assert(numExecutorsRunning <= targetNumExecutors)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
executorIdCounter += 1
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
val executorRunnable = new ExecutorRunnable(
container,
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources)
def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
assert(numExecutorsRunning <= targetNumExecutors)
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
}
if (launchContainers) {
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
driverUrl, executorHostname))
launcherPool.execute(executorRunnable)
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
container,
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
updateInternalState()
} catch {
case NonFatal(e) =>
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately to avoid unnecessary resource
// occupation.
amClient.releaseAssignedContainer(containerId)
}
}
})
} else {
// For test only
updateInternalState()
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment