Skip to content
Snippets Groups Projects
Commit ffe272d9 authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Revert "SPARK-1099:Spark's local mode should probably respect spark.cores.max by default"

This reverts commit 16789317. Jenkins was not run for this PR.
parent 16789317
No related branches found
No related tags found
No related merge requests found
......@@ -1262,10 +1262,7 @@ object SparkContext extends Logging {
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
// Use user specified in config, up to all available cores
val realCores = Runtime.getRuntime.availableProcessors()
val toUseCores = math.min(sc.conf.getInt("spark.cores.max", realCores), realCores)
val backend = new LocalBackend(scheduler, toUseCores)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
scheduler
......
......@@ -34,7 +34,7 @@ import org.apache.spark.SparkContext._
class FileSuite extends FunSuite with LocalSparkContext {
test("text files") {
sc = new SparkContext("local[1]", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
......@@ -176,7 +176,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("write SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
sc = new SparkContext("local[1]", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
......
......@@ -27,10 +27,10 @@ import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
def createTaskScheduler(master: String, conf: SparkConf = new SparkConf()): TaskSchedulerImpl = {
def createTaskScheduler(master: String): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
sc = new SparkContext("local", "test", conf)
sc = new SparkContext("local", "test")
val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
sched.asInstanceOf[TaskSchedulerImpl]
......@@ -44,26 +44,13 @@ class SparkContextSchedulerCreationSuite
}
test("local") {
var conf = new SparkConf()
conf.set("spark.cores.max", "1")
val sched = createTaskScheduler("local", conf)
val sched = createTaskScheduler("local")
sched.backend match {
case s: LocalBackend => assert(s.totalCores === 1)
case _ => fail()
}
}
test("local-cores-exceed") {
val cores = Runtime.getRuntime.availableProcessors() + 1
var conf = new SparkConf()
conf.set("spark.cores.max", cores.toString)
val sched = createTaskScheduler("local", conf)
sched.backend match {
case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
case _ => fail()
}
}
test("local-n") {
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment