Skip to content
Snippets Groups Projects
Commit 87954d4c authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #154 from soulmachine/ClusterScheduler

Replace the thread inside ClusterScheduler.start() with an Akka scheduler

Threads are precious resources so that we shouldn't abuse them
parents 83bf1920 28115fa8
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer ...@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet import scala.collection.mutable.HashSet
import akka.util.duration._
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.TaskState.TaskState import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
...@@ -119,21 +121,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -119,21 +121,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
backend.start() backend.start()
if (System.getProperty("spark.speculation", "false").toBoolean) { if (System.getProperty("spark.speculation", "false").toBoolean) {
new Thread("ClusterScheduler speculation check") { logInfo("Starting speculative execution thread")
setDaemon(true)
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
override def run() { SPECULATION_INTERVAL milliseconds) {
logInfo("Starting speculative execution thread") checkSpeculatableTasks()
while (true) { }
try {
Thread.sleep(SPECULATION_INTERVAL)
} catch {
case e: InterruptedException => {}
}
checkSpeculatableTasks()
}
}
}.start()
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment