diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4c544275c2b8c38bb413044637c6095eaac6134d..16258f35219fc9ecaac7ba54bfac4a53c2c60286 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -122,7 +122,7 @@ private[spark] class Executor( // Start worker thread pool val threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) + 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory) // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index a4e86538f99df8de7686e7e05ddfbc8f69aa6ff9..873801e86764f1c9cd4f5c8d7f7ea9d29baabb23 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -48,9 +48,12 @@ private[spark] trait SchedulableBuilder { return Some(tsm) } case pool: Pool => - getTsm(pool) + val found = getTsm(pool) + if (found.isDefined) { + return getTsm(pool) + } } - return None + None } getTsm(rootPool) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 250dec5126eb2df2436b8a8425eabb1c02332279..6c12ff7370bcdcc9cdd34a68747efe66d93f6c62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -78,12 +78,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) private val executorIdToHost = new HashMap[String, String] - // JAR server, if any JARs were added by the user to the SparkContext - var jarServer: HttpServer = null - - // URIs of JARs to pass to executor - var jarUris: String = "" - // Listener object to pass upcalls into var listener: TaskSchedulerListener = null @@ -356,9 +350,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (backend != null) { backend.stop() } - if (jarServer != null) { - jarServer.stop() - } if (taskResultGetter != null) { taskResultGetter.stop() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala index c0578dcaa128be21fed43bfbe1c26816986086e7..5367218faa685cdbe15c79c6f00de8b7fbf35c21 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala @@ -30,12 +30,8 @@ private[spark] trait SchedulerBackend { def reviveOffers(): Unit def defaultParallelism(): Int - def killTask(taskId: Long, executorId: String) { - throw new UnsupportedOperationException - } + def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException // Memory used by each executor (in megabytes) protected val executorMemory: Int = SparkContext.executorMemoryRequested - - // TODO: Probably want to add a killTask too } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index 0a6f4df902e7dbdc7db5a02a6c1e92b71dd40edd..dc6509d1957ce725d9863db10669ab25ee0fb458 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -209,7 +209,6 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: } override def stop() { - //threadPool.shutdownNow() } override def defaultParallelism() = threads diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala index af76c843e86b1704bf668d18d8ec333f18caeed0..d46a7469c7075ed74efd765ac2cdb1c974de77a7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala @@ -17,17 +17,15 @@ package org.apache.spark.scheduler.local -import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter - -import org.apache.spark._ -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster._ -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.{ConcurrentMap, HashMap} import java.util.concurrent.Semaphore import java.util.concurrent.CountDownLatch -import java.util.Properties + +import scala.collection.mutable.HashMap + +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +import org.apache.spark._ + class Lock() { var finished = false @@ -63,7 +61,11 @@ object TaskThreadInfo { * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue, * thus it will be scheduled later when cluster has free cpu cores. */ -class LocalSchedulerSuite extends FunSuite with LocalSparkContext { +class LocalSchedulerSuite extends FunSuite with LocalSparkContext with BeforeAndAfterEach { + + override def afterEach() { + System.clearProperty("spark.scheduler.mode") + } def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {