Skip to content
Snippets Groups Projects
Commit 9b731fad authored by Dennis Huo's avatar Dennis Huo Committed by Sandy Ryza
Browse files

[SPARK-9782] [YARN] Support YARN application tags via SparkConf

Add a new test case in yarn/ClientSuite which checks how the various SparkConf
and ClientArguments propagate into the ApplicationSubmissionContext.

Author: Dennis Huo <dhuo@google.com>

Closes #8072 from dennishuo/dhuo-yarn-application-tags.
parent 80cb25b2
No related branches found
No related tags found
No related merge requests found
......@@ -319,6 +319,14 @@ If you need a reference to the proper location to put log files in the YARN so t
running against earlier versions, this property will be ignored.
</td>
</tr>
<tr>
<td><code>spark.yarn.tags</code></td>
<td>(none)</td>
<td>
Comma-separated list of strings to pass through as YARN application tags appearing
in YARN ApplicationReports, which can be used for filtering when querying YARN apps.
</td>
</tr>
<tr>
<td><code>spark.yarn.keytab</code></td>
<td>(none)</td>
......
......@@ -163,6 +163,23 @@ private[spark] class Client(
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
.map(StringUtils.getTrimmedStringCollection(_))
.filter(!_.isEmpty())
.foreach { tagCollection =>
try {
// The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
// reflection to set it, printing a warning if a tag was specified but the YARN version
// doesn't support it.
val method = appContext.getClass().getMethod(
"setApplicationTags", classOf[java.util.Set[String]])
method.invoke(appContext, new java.util.HashSet[String](tagCollection))
} catch {
case e: NoSuchMethodException =>
logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
"YARN does not support it")
}
}
sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
......@@ -987,6 +1004,10 @@ object Client extends Logging {
// of the executors
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
// Comma-separated list of strings to pass through as YARN application tags appearing
// in YARN ApplicationReports, which can be used for filtering when querying YARN.
val CONF_SPARK_YARN_APPLICATION_TAGS = "spark.yarn.tags"
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
......
......@@ -29,8 +29,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.YarnClientApplication
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
......@@ -170,6 +173,39 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
cp should contain ("/remotePath/my1.jar")
}
test("configuration and args propagate through createApplicationSubmissionContext") {
val conf = new Configuration()
// When parsing tags, duplicates and leading/trailing whitespace should be removed.
// Spaces between non-comma strings should be preserved as single tags. Empty strings may or
// may not be removed depending on the version of Hadoop being used.
val sparkConf = new SparkConf()
.set(Client.CONF_SPARK_YARN_APPLICATION_TAGS, ",tag1, dup,tag2 , ,multi word , dup")
.set("spark.yarn.maxAppAttempts", "42")
val args = new ClientArguments(Array(
"--name", "foo-test-app",
"--queue", "staging-queue"), sparkConf)
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
val client = new Client(args, conf, sparkConf)
client.createApplicationSubmissionContext(
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)
appContext.getApplicationName should be ("foo-test-app")
appContext.getQueue should be ("staging-queue")
appContext.getAMContainerSpec should be (containerLaunchContext)
appContext.getApplicationType should be ("SPARK")
appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method =>
val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]]
tags should contain allOf ("tag1", "dup", "tag2", "multi word")
tags.filter(!_.isEmpty).size should be (4)
}
appContext.getMaxAppAttempts should be (42)
}
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment