Skip to content
Snippets Groups Projects
Commit f6680cdc authored by Thomas Graves's avatar Thomas Graves Committed by Marcelo Vanzin
Browse files

[SPARK-11555] spark on yarn spark-class --num-workers doesn't work

I tested the various options with both spark-submit and spark-class of specifying number of executors in both client and cluster mode where it applied.

--num-workers, --num-executors, spark.executor.instances, SPARK_EXECUTOR_INSTANCES, default nothing supplied

Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com>

Closes #9523 from tgravescs/SPARK-11555.
parent c447c9d5
No related branches found
No related tags found
No related merge requests found
......@@ -81,7 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orNull
// If dynamic allocation is enabled, start at the configured initial number of executors.
// Default to minExecutors if no initialExecutors is set.
numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
principal = Option(principal)
.orElse(sparkConf.getOption("spark.yarn.principal"))
.orNull
......
......@@ -392,8 +392,11 @@ object YarnSparkHadoopUtil {
/**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
* If not using dynamic allocation it gets the number of executors reqeusted by the user.
*/
def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
def getInitialTargetExecutorNumber(
conf: SparkConf,
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
val initialNumExecutors =
......@@ -406,7 +409,7 @@ object YarnSparkHadoopUtil {
initialNumExecutors
} else {
val targetNumExecutors =
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors)
// System property can override environment variable.
conf.getInt("spark.executor.instances", targetNumExecutors)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment