Skip to content
Snippets Groups Projects
Commit 28115fa8 authored by soulmachine's avatar soulmachine
Browse files

replace the thread with a Akka scheduler

parent dd63c548
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import akka.util.duration._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
......@@ -119,21 +121,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
backend.start()
if (System.getProperty("spark.speculation", "false").toBoolean) {
new Thread("ClusterScheduler speculation check") {
setDaemon(true)
override def run() {
logInfo("Starting speculative execution thread")
while (true) {
try {
Thread.sleep(SPECULATION_INTERVAL)
} catch {
case e: InterruptedException => {}
}
checkSpeculatableTasks()
}
}
}.start()
logInfo("Starting speculative execution thread")
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
checkSpeculatableTasks()
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment