Skip to content
Snippets Groups Projects
Commit fbe8e985 authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Andrew Or
Browse files

[SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.

Sometimes the cluster's start() method returns before the configuration
having been updated, which is done by ClientRMService in, I assume, a
separate thread (otherwise there would be no race). That can cause tests
to fail if the old configuration data is read, since it will contain
the wrong RM address.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #2605 from vanzin/SPARK-2778 and squashes the following commits:

8d02ce0 [Marcelo Vanzin] Minor cleanup.
5bebee7 [Marcelo Vanzin] [SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.
parent 22f8e1ee
No related branches found
No related tags found
No related merge requests found
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn package org.apache.spark.deploy.yarn
import java.io.File import java.io.File
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
...@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext} ...@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers { class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
// log4j configuration for the Yarn containers, so that their output is collected // log4j configuration for the Yarn containers, so that their output is collected
// by Yarn instead of trying to overwrite unit-tests.log. // by Yarn instead of trying to overwrite unit-tests.log.
...@@ -66,7 +67,33 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers { ...@@ -66,7 +67,33 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(new YarnConfiguration()) yarnCluster.init(new YarnConfiguration())
yarnCluster.start() yarnCluster.start()
yarnCluster.getConfig().foreach { e =>
// There's a race in MiniYARNCluster in which start() may return before the RM has updated
// its address in the configuration. You can see this in the logs by noticing that when
// MiniYARNCluster prints the address, it still has port "0" assigned, although later the
// test works sometimes:
//
// INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
//
// That log message prints the contents of the RM_ADDRESS config variable. If you check it
// later on, it looks something like this:
//
// INFO YarnClusterSuite: RM address in configuration is blah:42631
//
// This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
// done so in a timely manner (defined to be 10 seconds).
val config = yarnCluster.getConfig()
val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
if (System.currentTimeMillis() > deadline) {
throw new IllegalStateException("Timed out waiting for RM to come up.")
}
logDebug("RM address still not set in configuration, waiting...")
TimeUnit.MILLISECONDS.sleep(100)
}
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
config.foreach { e =>
sys.props += ("spark.hadoop." + e.getKey() -> e.getValue()) sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
} }
...@@ -86,13 +113,13 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers { ...@@ -86,13 +113,13 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
super.afterAll() super.afterAll()
} }
ignore("run Spark in yarn-client mode") { test("run Spark in yarn-client mode") {
var result = File.createTempFile("result", null, tempDir) var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result) checkResult(result)
} }
ignore("run Spark in yarn-cluster mode") { test("run Spark in yarn-cluster mode") {
val main = YarnClusterDriver.getClass.getName().stripSuffix("$") val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
var result = File.createTempFile("result", null, tempDir) var result = File.createTempFile("result", null, tempDir)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment