Skip to content
Snippets Groups Projects
Commit bce0897b authored by witgo's avatar witgo Committed by Thomas Graves
Browse files

[SPARK-2051]In yarn.ClientBase spark.yarn.dist.* do not work

Author: witgo <witgo@qq.com>

Closes #969 from witgo/yarn_ClientBase and squashes the following commits:

8117765 [witgo] review commit
3bdbc52 [witgo] Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase
5261b6c [witgo] fix sys.props.get("SPARK_YARN_DIST_FILES")
e3c1107 [witgo] update docs
b6a9aa1 [witgo] merge master
c8b4554 [witgo] review commit
2f48789 [witgo] Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase
8d7b82f [witgo] Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase
1048549 [witgo] remove Utils.resolveURIs
871f1db [witgo] add spark.yarn.dist.* documentation
41bce59 [witgo] review commit
35d6fa0 [witgo] move to ClientArguments
55d72fc [witgo] Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase
9cdff16 [witgo] review commit
8bc2f4b [witgo] review commit
20e667c [witgo] Merge branch 'master' into yarn_ClientBase
0961151 [witgo] merge master
ce609fc [witgo] Merge branch 'master' into yarn_ClientBase
8362489 [witgo] yarn.ClientBase spark.yarn.dist.* do not work
parent 67fca189
No related branches found
No related tags found
No related merge requests found
......@@ -68,15 +68,29 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>384</code></td>
<td><code>spark.yarn.dist.archives</code></td>
<td>(none)</td>
<td>
Comma separated list of archives to be extracted into the working directory of each executor.
</td>
</tr>
<tr>
<td><code>spark.yarn.dist.files</code></td>
<td>(none)</td>
<td>
Comma-separated list of files to be placed in the working directory of each executor.
<td>
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>384</td>
<td>
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>384</code></td>
<td>384</td>
<td>
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
</td>
......
......@@ -21,8 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.util.IntParam
import org.apache.spark.util.MemoryParam
import org.apache.spark.util.{Utils, IntParam, MemoryParam}
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
......@@ -45,6 +44,18 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
parseArgs(args.toList)
// env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then
// it should default to hdfs://
files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
// spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified,
// for both yarn-client and yarn-cluster
files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
map(p => Utils.resolveURIs(p)).orNull)
archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
map(p => Utils.resolveURIs(p)).orNull)
private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
......
......@@ -162,7 +162,7 @@ trait ClientBase extends Logging {
val fs = FileSystem.get(conf)
val remoteFs = originalPath.getFileSystem(conf)
var newPath = originalPath
if (! compareFs(remoteFs, fs)) {
if (!compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
......@@ -250,6 +250,7 @@ trait ClientBase extends Logging {
}
}
}
logInfo("Prepared Local resources " + localResources)
sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
UserGroupInformation.getCurrentUser().addCredentials(credentials)
......
......@@ -70,9 +70,7 @@ private[spark] class YarnClientSchedulerBackend(
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
("--name", "SPARK_YARN_APP_NAME", "spark.app.name"),
("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"),
("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"))
("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
.foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
logDebug("ClientArguments called with: " + argsArrayBuf)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment