Skip to content
Snippets Groups Projects
Commit b8487713 authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Patrick Wendell
Browse files

[SPARK-2778] [yarn] Add yarn integration tests.

This patch adds a couple of, currently, very simple integration tests
to make sure both client and cluster modes are working. The tests don't
do much yet other than run a simple job, but the plan is to enhance
them after we get the framework in.

The cluster tests are noisy, so redirect all log output to a file
like other tests do. Copying the conf around sucks but it's less
work than messing with maven/sbt and having to clean up other
projects.

Note the test is only added for yarn-stable. The code compiles
against yarn-alpha but there are two issues I ran into that I
could not overcome:
- an old netty dependency kept creeping into the classpath and
  causing akka to not work, when using sbt; the old netty was
  correctly suppressed under maven.
- MiniYARNCluster kept failing to execute containers because it
  did not create the NM's local dir itself; this is apparently
  a known behavior, but I'm not sure how to work around it.

None of those issues are present with the stable Yarn.

Also, these tests are a little slow to run. Apparently Spark doesn't
yet tag tests (so that these could be isolated in a "slow" batch),
so this is something to keep in mind.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #2257 from vanzin/yarn-tests and squashes the following commits:

6d5b84e [Marcelo Vanzin] Fix wrong system property being set.
8b0933d [Marcelo Vanzin] Merge branch 'master' into yarn-tests
5c2b56f [Marcelo Vanzin] Use custom log4j conf for Yarn containers.
ec73f17 [Marcelo Vanzin] More review feedback.
67f5b02 [Marcelo Vanzin] Review feedback.
f01517c [Marcelo Vanzin] Review feedback.
68fbbbf [Marcelo Vanzin] Use older constructor available in older Hadoop releases.
d07ef9a [Marcelo Vanzin] Merge branch 'master' into yarn-tests
add8416 [Marcelo Vanzin] [SPARK-2778] [yarn] Add yarn integration tests.
parent 8ca4ecb6
No related branches found
No related tags found
No related merge requests found
...@@ -712,6 +712,35 @@ ...@@ -712,6 +712,35 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${yarn.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId> <artifactId>hadoop-yarn-server-web-proxy</artifactId>
...@@ -1187,7 +1216,7 @@ ...@@ -1187,7 +1216,7 @@
<dependency> <dependency>
<groupId>org.apache.zookeeper</groupId> <groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId> <artifactId>zookeeper</artifactId>
<version>3.4.5-mapr-1406</version> <version>3.4.5-mapr-1406</version>
</dependency> </dependency>
</dependencies> </dependencies>
</profile> </profile>
......
...@@ -401,17 +401,17 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -401,17 +401,17 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
status = FinalApplicationStatus.SUCCEEDED status = FinalApplicationStatus.SUCCEEDED
} catch { } catch {
case e: InvocationTargetException => { case e: InvocationTargetException =>
e.getCause match { e.getCause match {
case _: InterruptedException => { case _: InterruptedException =>
// Reporter thread can interrupt to stop user class // Reporter thread can interrupt to stop user class
}
case e => throw e
} }
}
} finally { } finally {
logDebug("Finishing main") logDebug("Finishing main")
finalStatus = status
} }
finalStatus = status
} }
} }
userClassThread.setName("Driver") userClassThread.setName("Driver")
......
...@@ -348,7 +348,7 @@ private[spark] trait ClientBase extends Logging { ...@@ -348,7 +348,7 @@ private[spark] trait ClientBase extends Logging {
} }
// For log4j configuration to reference // For log4j configuration to reference
javaOpts += "-D=spark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
val userClass = val userClass =
if (args.userClass != null) { if (args.userClass != null) {
......
...@@ -98,7 +98,7 @@ trait ExecutorRunnableUtil extends Logging { ...@@ -98,7 +98,7 @@ trait ExecutorRunnableUtil extends Logging {
*/ */
// For log4j configuration to reference // For log4j configuration to reference
javaOpts += "-D=spark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
"-server", "-server",
......
...@@ -140,7 +140,6 @@ ...@@ -140,7 +140,6 @@
<configuration> <configuration>
<environmentVariables> <environmentVariables>
<SPARK_HOME>${basedir}/../..</SPARK_HOME> <SPARK_HOME>${basedir}/../..</SPARK_HOME>
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
</environmentVariables> </environmentVariables>
</configuration> </configuration>
</plugin> </plugin>
...@@ -148,7 +147,7 @@ ...@@ -148,7 +147,7 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<resources> <resources>
<resource> <resource>
<directory>../common/src/main/resources</directory> <directory>../common/src/main/resources</directory>
......
...@@ -32,4 +32,13 @@ ...@@ -32,4 +32,13 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Spark Project YARN Stable API</name> <name>Spark Project YARN Stable API</name>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</project> </project>
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file core/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
org.eclipse.jetty.LEVEL=WARN
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.io.File
import scala.collection.JavaConversions._
import com.google.common.base.Charsets
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
// log4j configuration for the Yarn containers, so that their output is collected
// by Yarn instead of trying to overwrite unit-tests.log.
private val LOG4J_CONF = """
|log4j.rootCategory=DEBUG, console
|log4j.appender.console=org.apache.log4j.ConsoleAppender
|log4j.appender.console.target=System.err
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
""".stripMargin
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
private var oldConf: Map[String, String] = _
override def beforeAll() {
tempDir = Utils.createTempDir()
val logConfDir = new File(tempDir, "log4j")
logConfDir.mkdir()
val logConfFile = new File(logConfDir, "log4j.properties")
Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
sys.props("java.class.path")
oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(new YarnConfiguration())
yarnCluster.start()
yarnCluster.getConfig().foreach { e =>
sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
}
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
sys.props += ("spark.executor.instances" -> "1")
sys.props += ("spark.driver.extraClassPath" -> childClasspath)
sys.props += ("spark.executor.extraClassPath" -> childClasspath)
super.beforeAll()
}
override def afterAll() {
yarnCluster.stop()
sys.props.retain { case (k, v) => !k.startsWith("spark.") }
sys.props ++= oldConf
super.afterAll()
}
test("run Spark in yarn-client mode") {
var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result)
}
test("run Spark in yarn-cluster mode") {
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
var result = File.createTempFile("result", null, tempDir)
// The Client object will call System.exit() after the job is done, and we don't want
// that because it messes up the scalatest monitoring. So replicate some of what main()
// does here.
val args = Array("--class", main,
"--jar", "file:" + fakeSparkJar.getAbsolutePath(),
"--arg", "yarn-cluster",
"--arg", result.getAbsolutePath(),
"--num-executors", "1")
val sparkConf = new SparkConf()
val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val clientArgs = new ClientArguments(args, sparkConf)
new Client(clientArgs, yarnConf, sparkConf).run()
checkResult(result)
}
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So
* the tests enforce that something is written to a file after everything is ok to indicate
* that the job succeeded.
*/
private def checkResult(result: File) = {
var resultString = Files.toString(result, Charsets.UTF_8)
resultString should be ("success")
}
}
private object YarnClusterDriver extends Logging with Matchers {
def main(args: Array[String]) = {
if (args.length != 2) {
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
|Usage: YarnClusterDriver [master] [result file]
""".stripMargin)
System.exit(1)
}
val sc = new SparkContext(new SparkConf().setMaster(args(0))
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
val status = new File(args(1))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {
sc.stop()
Files.write(result, status, Charsets.UTF_8)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment