Skip to content
Snippets Groups Projects
Commit d513c99c authored by sharkd's avatar sharkd Committed by Marcelo Vanzin
Browse files

[SPARK-16414][YARN] Fix bugs for "Can not get user config when calling...

[SPARK-16414][YARN] Fix bugs for "Can not get user config when calling SparkHadoopUtil.get.conf on yarn cluser mode"

## What changes were proposed in this pull request?

The `SparkHadoopUtil` singleton was instantiated before `ApplicationMaster` in `ApplicationMaster.main` when deploying spark on yarn cluster mode, the `conf` in the `SparkHadoopUtil` singleton didn't include user's configuration.

So, we should load the properties file with the Spark configuration and set entries as system properties before `SparkHadoopUtil` first instantiate.

## How was this patch tested?

Add a test case

Author: sharkd <sharkd.tu@gmail.com>
Author: sharkdtu <sharkdtu@tencent.com>

Closes #14088 from sharkdtu/master.
parent c377e49e
No related branches found
No related tags found
No related merge requests found
......@@ -50,14 +50,6 @@ private[spark] class ApplicationMaster(
client: YarnRMClient)
extends Logging {
// Load the properties file with the Spark configuration and set entries as system properties,
// so that user code run inside the AM also has access to them.
if (args.propertiesFile != null) {
Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
......@@ -743,6 +735,15 @@ object ApplicationMaster extends Logging {
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
// Load the properties file with the Spark configuration and set entries as system properties,
// so that user code run inside the AM also has access to them.
// Note: we must do this before SparkHadoopUtil instantiated
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
......
......@@ -32,6 +32,7 @@ import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.launcher._
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
......@@ -106,6 +107,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
))
}
test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") {
testYarnAppUseSparkHadoopUtilConf()
}
test("run Spark in yarn-client mode with additional jar") {
testWithAddJar(true)
}
......@@ -181,6 +186,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, result)
}
private def testYarnAppUseSparkHadoopUtilConf(): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(false,
mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
appArgs = Seq("key=value", result.getAbsolutePath()),
extraConf = Map("spark.hadoop.key" -> "value"))
checkResult(finalState, result)
}
private def testWithAddJar(clientMode: Boolean): Unit = {
val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
val driverResult = File.createTempFile("driver", null, tempDir)
......@@ -274,6 +288,37 @@ private object YarnClusterDriverWithFailure extends Logging with Matchers {
}
}
private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
// scalastyle:off println
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
|Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file]
""".stripMargin)
// scalastyle:on println
System.exit(1)
}
val sc = new SparkContext(new SparkConf()
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn test using SparkHadoopUtil's conf"))
val kv = args(0).split("=")
val status = new File(args(1))
var result = "failure"
try {
SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1))
result = "success"
} finally {
Files.write(result, status, StandardCharsets.UTF_8)
sc.stop()
}
}
}
private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment