From 0bd1900cbce5946999c38293852d8ccd4f838930 Mon Sep 17 00:00:00 2001 From: Matei Zaharia <matei@databricks.com> Date: Sun, 29 Dec 2013 15:38:46 -0500 Subject: [PATCH] Fix a few settings that were being read as system properties after merge --- .../spark/scheduler/TaskSchedulerImpl.scala | 4 +++- .../spark/scheduler/TaskSetManager.scala | 18 ++++++++++-------- .../streaming/scheduler/JobScheduler.scala | 4 ++-- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 56a038dc69..bffd990e16 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -47,10 +47,12 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode */ private[spark] class TaskSchedulerImpl( val sc: SparkContext, - val maxTaskFailures: Int = System.getProperty("spark.task.maxFailures", "4").toInt, + val maxTaskFailures: Int, isLocal: Boolean = false) extends TaskScheduler with Logging { + def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt) + val conf = sc.conf // How often to check for speculative tasks diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 9b95e418d8..d752e6f111 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -54,12 +54,14 @@ private[spark] class TaskSetManager( clock: Clock = SystemClock) extends Schedulable with Logging { + val conf = sched.sc.conf + // CPUs to request per task - val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt + val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt // Quantile of tasks at which to start speculation - val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble - val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble + val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble + val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble // Serializer for closures and tasks. val env = SparkEnv.get @@ -118,7 +120,7 @@ private[spark] class TaskSetManager( // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = - System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong + conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong // Map of recent exceptions (identified by string representation and top stack frame) to // duplicate count (how many times the same exception has appeared) and time the full exception @@ -682,14 +684,14 @@ private[spark] class TaskSetManager( } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { - val defaultWait = System.getProperty("spark.locality.wait", "3000") + val defaultWait = conf.getOrElse("spark.locality.wait", "3000") level match { case TaskLocality.PROCESS_LOCAL => - System.getProperty("spark.locality.wait.process", defaultWait).toLong + conf.getOrElse("spark.locality.wait.process", defaultWait).toLong case TaskLocality.NODE_LOCAL => - System.getProperty("spark.locality.wait.node", defaultWait).toLong + conf.getOrElse("spark.locality.wait.node", defaultWait).toLong case TaskLocality.RACK_LOCAL => - System.getProperty("spark.locality.wait.rack", defaultWait).toLong + conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong case TaskLocality.ANY => 0L } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 9511ccfbed..7fd8d41c8c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -25,7 +25,7 @@ import org.apache.spark.streaming._ /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate - * the jobs and runs them using a thread pool. Number of threads + * the jobs and runs them using a thread pool. Number of threads */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { @@ -33,7 +33,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { initLogging() val jobSets = new ConcurrentHashMap[Time, JobSet] - val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt + val numConcurrentJobs = ssc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt val executor = Executors.newFixedThreadPool(numConcurrentJobs) val generator = new JobGenerator(this) val listenerBus = new StreamingListenerBus() -- GitLab