Skip to content
Snippets Groups Projects
Commit 43dfac51 authored by Henry Saputra's avatar Henry Saputra
Browse files

Merge branch 'master' into removesemicolonscala

parents 10be58f2 f568912f
No related branches found
No related tags found
No related merge requests found
Showing with 87 additions and 52 deletions
...@@ -199,6 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac ...@@ -199,6 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
} }
override def stop() { override def stop() {
stopExecutors()
try { try {
if (driverActor != null) { if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout) val future = driverActor.ask(StopDriver)(timeout)
......
...@@ -62,7 +62,6 @@ private[spark] class SimrSchedulerBackend( ...@@ -62,7 +62,6 @@ private[spark] class SimrSchedulerBackend(
val conf = new Configuration() val conf = new Configuration()
val fs = FileSystem.get(conf) val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false) fs.delete(new Path(driverFilePath), false)
super.stopExecutors()
super.stop() super.stop()
} }
} }
...@@ -76,7 +76,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -76,7 +76,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
</tr> </tr>
} }
val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b) val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execTable = UIUtils.listingTable(execHead, execRow, execInfo) val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
val content = val content =
...@@ -99,16 +99,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -99,16 +99,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors) UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors)
} }
def getExecInfo(a: Int): Seq[String] = { def getExecInfo(statusId: Int): Seq[String] = {
val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId val status = sc.getExecutorStorageStatus(statusId)
val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort val execId = status.blockManagerId.executorId
val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString val hostPort = status.blockManagerId.hostPort
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString val rddBlocks = status.blocks.size.toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString val memUsed = status.memUsed().toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString val maxMem = status.maxMem.toString
val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0) val diskUsed = status.diskUsed().toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0) val activeTasks = listener.executorToTasksActive.getOrElse(execId, HashSet.empty[Long]).size
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0) val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks val totalTasks = activeTasks + failedTasks + completedTasks
Seq( Seq(
......
...@@ -37,6 +37,8 @@ System Properties: ...@@ -37,6 +37,8 @@ System Properties:
* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10. * 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives. * 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them. * 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
* 'spark.yarn.scheduler.heartbeat.interval-ms', the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
* 'spark.yarn.max.worker.failures', the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3.
# Launching Spark on YARN # Launching Spark on YARN
......
...@@ -32,13 +32,13 @@ object BroadcastTest { ...@@ -32,13 +32,13 @@ object BroadcastTest {
System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory")
System.setProperty("spark.broadcast.blockSize", blockSize) System.setProperty("spark.broadcast.blockSize", blockSize)
val sc = new SparkContext(args(0), "Broadcast Test 2", val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2 val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000 val num = if (args.length > 2) args(2).toInt else 1000000
var arr1 = new Array[Int](num) val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) { for (i <- 0 until arr1.length) {
arr1(i) = i arr1(i) = i
} }
...@@ -48,9 +48,9 @@ object BroadcastTest { ...@@ -48,9 +48,9 @@ object BroadcastTest {
println("===========") println("===========")
val startTime = System.nanoTime val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1) val barr1 = sc.broadcast(arr1)
sc.parallelize(1 to 10, slices).foreach { val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.size)
i => println(barr1.value.size) // Collect the small RDD so we can print the observed sizes locally.
} observedSizes.collect().foreach(i => println(i))
println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
} }
......
...@@ -18,35 +18,38 @@ ...@@ -18,35 +18,38 @@
package org.apache.spark.examples package org.apache.spark.examples
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object MultiBroadcastTest { object MultiBroadcastTest {
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length == 0) { if (args.length == 0) {
System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]") System.err.println("Usage: MultiBroadcastTest <master> [<slices>] [numElem]")
System.exit(1) System.exit(1)
} }
val sc = new SparkContext(args(0), "Broadcast Test", val sc = new SparkContext(args(0), "Multi-Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2 val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000 val num = if (args.length > 2) args(2).toInt else 1000000
var arr1 = new Array[Int](num) val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) { for (i <- 0 until arr1.length) {
arr1(i) = i arr1(i) = i
} }
var arr2 = new Array[Int](num) val arr2 = new Array[Int](num)
for (i <- 0 until arr2.length) { for (i <- 0 until arr2.length) {
arr2(i) = i arr2(i) = i
} }
val barr1 = sc.broadcast(arr1) val barr1 = sc.broadcast(arr1)
val barr2 = sc.broadcast(arr2) val barr2 = sc.broadcast(arr2)
sc.parallelize(1 to 10, slices).foreach { val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ =>
i => println(barr1.value.size + barr2.value.size) (barr1.value.size, barr2.value.size)
} }
// Collect the small RDD so we can print the observed sizes locally.
observedSizes.collect().foreach(i => println(i))
System.exit(0) System.exit(0)
} }
......
...@@ -54,7 +54,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -54,7 +54,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true private var isLastAMRetry: Boolean = true
// default to numWorkers * 2, with minimum of 3
private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3).toString()).toInt
def run() { def run() {
// setup the directories so things go to yarn approved directories rather // setup the directories so things go to yarn approved directories rather
...@@ -227,12 +229,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -227,12 +229,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (null != sparkContext) { if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIAddress uiAddress = sparkContext.ui.appUIAddress
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
sparkContext.preferredNodeLocationData) appAttemptId, args, sparkContext.preferredNodeLocationData)
} else { } else {
logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
", numTries = " + numTries) ", numTries = " + numTries)
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args) this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
appAttemptId, args)
} }
} }
} finally { } finally {
...@@ -251,8 +254,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -251,8 +254,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
while(yarnAllocator.getNumWorkersRunning < args.numWorkers && while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
// If user thread exists, then quit ! // If user thread exists, then quit !
userThread.isAlive) { userThread.isAlive) {
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of worker failures reached")
}
yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1) ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100) Thread.sleep(100)
} }
...@@ -268,21 +274,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -268,21 +274,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. // we want to be reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher. val schedulerInterval =
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
launchReporterThread(interval) launchReporterThread(interval)
} }
} }
// TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = { private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
val t = new Thread { val t = new Thread {
override def run() { override def run() {
while (userThread.isAlive) { while (userThread.isAlive) {
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of worker failures reached")
}
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) { if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
...@@ -321,7 +333,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -321,7 +333,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
} }
*/ */
def finishApplicationMaster(status: FinalApplicationStatus) { def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
synchronized { synchronized {
if (isFinished) { if (isFinished) {
...@@ -335,6 +347,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -335,6 +347,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest] .asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId) finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status) finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
// set tracking url to empty since we don't have a history server // set tracking url to empty since we don't have a history server
finishReq.setTrackingUrl("") finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq) resourceManager.finishApplicationMaster(finishReq)
......
...@@ -57,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -57,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
def run() { def run() {
validateArgs()
init(yarnConf) init(yarnConf)
start() start()
logClusterResourceDetails() logClusterResourceDetails()
...@@ -81,6 +83,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -81,6 +83,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(0) System.exit(0)
} }
def validateArgs() = {
Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
(args.userJar == null) -> "Error: You must specify a user jar!",
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
(args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
.foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
args.printUsageAndExit(1)
}
}
}
def getAppStagingDir(appId: ApplicationId): String = { def getAppStagingDir(appId: ApplicationId): String = {
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
} }
...@@ -94,7 +113,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -94,7 +113,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size) ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
} }
def verifyClusterResources(app: GetNewApplicationResponse) = { def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory() val maxMem = app.getMaximumResourceCapability().getMemory()
...@@ -212,11 +230,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -212,11 +230,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
System.exit(1)
}
Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")) Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) => .foreach { case(destName, _localPath) =>
...@@ -331,7 +344,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -331,7 +344,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -Djava.io.tmpdir=" + JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers) // node, spark gc effects all other containers performance (which can also be other spark containers)
...@@ -357,11 +369,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -357,11 +369,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
javaCommand = Environment.JAVA_HOME.$() + "/bin/java" javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
} }
if (args.userClass == null) {
logError("Error: You must specify a user class!")
System.exit(1)
}
val commands = List[String](javaCommand + val commands = List[String](javaCommand +
" -server " + " -server " +
JAVA_OPTS + JAVA_OPTS +
...@@ -439,6 +446,7 @@ object Client { ...@@ -439,6 +446,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true") System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings) val args = new ClientArguments(argStrings)
new Client(args).run new Client(args).run
} }
......
...@@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM ...@@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
// Used to generate a unique id per worker // Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger() private val workerIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger() private val lastResponseId = new AtomicInteger()
private val numWorkersFailed = new AtomicInteger()
def getNumWorkersRunning: Int = numWorkersRunning.intValue def getNumWorkersRunning: Int = numWorkersRunning.intValue
def getNumWorkersFailed: Int = numWorkersFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = { def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
...@@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM ...@@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
else { else {
// simply decrement count - next iteration of ReporterThread will take care of allocating ! // simply decrement count - next iteration of ReporterThread will take care of allocating !
numWorkersRunning.decrementAndGet() numWorkersRunning.decrementAndGet()
logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState + logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
" httpaddress: " + completedContainer.getDiagnostics) " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
numWorkersFailed.incrementAndGet()
}
} }
allocatedHostToContainersMap.synchronized { allocatedHostToContainersMap.synchronized {
...@@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM ...@@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val releasedContainerList = createReleasedContainerList() val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList) req.addAllReleases(releasedContainerList)
if (numWorkers > 0) { if (numWorkers > 0) {
logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.") logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment