Skip to content
Snippets Groups Projects
Commit 0bd1900c authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fix a few settings that were being read as system properties after merge

parent b4ceed40
No related branches found
No related tags found
No related merge requests found
......@@ -47,10 +47,12 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int = System.getProperty("spark.task.maxFailures", "4").toInt,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt)
val conf = sc.conf
// How often to check for speculative tasks
......
......@@ -54,12 +54,14 @@ private[spark] class TaskSetManager(
clock: Clock = SystemClock)
extends Schedulable with Logging
{
val conf = sched.sc.conf
// CPUs to request per task
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val env = SparkEnv.get
......@@ -118,7 +120,7 @@ private[spark] class TaskSetManager(
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
......@@ -682,14 +684,14 @@ private[spark] class TaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = System.getProperty("spark.locality.wait", "3000")
val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
System.getProperty("spark.locality.wait.process", defaultWait).toLong
conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
System.getProperty("spark.locality.wait.node", defaultWait).toLong
conf.getOrElse("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
System.getProperty("spark.locality.wait.rack", defaultWait).toLong
conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}
......
......@@ -25,7 +25,7 @@ import org.apache.spark.streaming._
/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
* the jobs and runs them using a thread pool. Number of threads
* the jobs and runs them using a thread pool. Number of threads
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
......@@ -33,7 +33,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
initLogging()
val jobSets = new ConcurrentHashMap[Time, JobSet]
val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val numConcurrentJobs = ssc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
val generator = new JobGenerator(this)
val listenerBus = new StreamingListenerBus()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment