From 28115fa8cb942c907a90e48ee1171f2a9b698411 Mon Sep 17 00:00:00 2001
From: soulmachine <soulmachine@gmail.com>
Date: Sat, 9 Nov 2013 22:38:27 +0800
Subject: [PATCH] replace the thread with a Akka scheduler

---
 .../scheduler/cluster/ClusterScheduler.scala  | 23 +++++++------------
 1 file changed, 8 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 85033958ef..53a589615d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 
+import akka.util.duration._
+
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler._
@@ -119,21 +121,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     backend.start()
 
     if (System.getProperty("spark.speculation", "false").toBoolean) {
-      new Thread("ClusterScheduler speculation check") {
-        setDaemon(true)
-
-        override def run() {
-          logInfo("Starting speculative execution thread")
-          while (true) {
-            try {
-              Thread.sleep(SPECULATION_INTERVAL)
-            } catch {
-              case e: InterruptedException => {}
-            }
-            checkSpeculatableTasks()
-          }
-        }
-      }.start()
+      logInfo("Starting speculative execution thread")
+
+      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
+            SPECULATION_INTERVAL milliseconds) {
+        checkSpeculatableTasks()
+      }
     }
   }
 
-- 
GitLab