diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index aed9826377ddf8b34c37cab315934a2bb77747d1..04d01e9ce8d363ce5f28a49836f54b3beb038f3d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -11,6 +11,7 @@ import spark.TaskState.TaskState
 import spark.scheduler._
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
 
 /**
  * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@@ -22,8 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   // How often to check for speculative tasks
   val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
-  // How often to check for starved TaskSets
-  val STARVATION_CHECK_INTERVAL = System.getProperty("spark.starvation_check.interval", "5000").toLong
+  // Threshold above which we warn user initial TaskSet may be starved
+  val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong
 
   val activeTaskSets = new HashMap[String, TaskSetManager]
   var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -32,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   val taskIdToExecutorId = new HashMap[Long, String]
   val taskSetTaskIds = new HashMap[String, HashSet[Long]]
 
+  var hasReceivedTask = false
+  var hasLaunchedTask = false
+  val starvationTimer = new Timer(true)
+
   // Incrementing Mesos task IDs
   val nextTaskId = new AtomicLong(0)
 
@@ -86,21 +91,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
         }
       }.start()
     }
-
-    new Thread("ClusterScheduler starvation check") {
-      setDaemon(true)
-
-      override def run() {
-        while (true) {
-          try {
-            Thread.sleep(STARVATION_CHECK_INTERVAL)
-          } catch {
-            case e: InterruptedException => {}
-          }
-          detectStarvedTaskSets()
-        }
-      }
-    }.start()
   }
 
   override def submitTasks(taskSet: TaskSet) {
@@ -111,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       activeTaskSets(taskSet.id) = manager
       activeTaskSetsQueue += manager
       taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+
+      if (hasReceivedTask == false) {
+        starvationTimer.scheduleAtFixedRate(new TimerTask() {
+          override def run() {
+            if (!hasLaunchedTask) {
+              logWarning("Initial TaskSet has not accepted any offers. " +
+                "Check the scheduler UI to ensure slaves are registered.")
+            }
+          }
+        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+      }
+      hasReceivedTask = true;
     }
     backend.reviveOffers()
   }
@@ -167,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
           }
         } while (launchedTask)
       }
+      if (tasks.size > 0) hasLaunchedTask = true
       return tasks
     }
   }
@@ -266,20 +269,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     }
   }
 
-  // Find and resource-starved TaskSets and alert the user
-  def detectStarvedTaskSets() {
-    val noOfferThresholdSeconds = 5
-    synchronized {
-      for (ts <- activeTaskSetsQueue) {
-        if (ts == TaskSetManager.firstTaskSet.get &&
-            (System.currentTimeMillis - ts.creationTime > noOfferThresholdSeconds * 1000) &&
-            ts.receivedOffers == 0) {
-          logWarning("No offers received. Check the scheduler UI to ensure slaves are registered.")
-        }
-      }
-    }
-  }
-
   def executorLost(executorId: String, reason: ExecutorLossReason) {
     var failedExecutor: Option[String] = None
     synchronized {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 58c5d4553e7ce5a5d78a56391d030c3b31412780..584cfdff45a1d0dafab96c93f6723e34ad40bc2e 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -44,7 +44,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
   var tasksFinished = 0
   val creationTime = System.currentTimeMillis
-  var receivedOffers = 0
 
   // Last time when we launched a preferred task (for delay scheduling)
   var lastPreferredLaunchTime = System.currentTimeMillis
@@ -98,8 +97,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
     addPendingTask(i)
   }
 
-  if (!TaskSetManager.firstTaskSet.isDefined) TaskSetManager.firstTaskSet = Some(this)
-
   // Add a task to all the pending-task lists that it should be on.
   private def addPendingTask(index: Int) {
     val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
@@ -192,7 +189,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
 
   // Respond to an offer of a single slave from the scheduler by finding a task
   def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
-    receivedOffers += 1
     if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
       val time = System.currentTimeMillis
       val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -432,7 +428,3 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
     return foundTasks
   }
 }
-
-object TaskSetManager {
-  var firstTaskSet: Option[TaskSetManager] = None
-}