From 6b5980da796e0204a7735a31fb454f312bc9daac Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@cs.berkeley.edu>
Date: Wed, 19 Sep 2012 15:36:16 -0700
Subject: [PATCH] Set a limited number of retry in standalone deploy mode.

---
 .../scala/spark/deploy/master/JobInfo.scala   |  9 +++++
 .../scala/spark/deploy/master/JobState.scala  |  2 +
 .../scala/spark/deploy/master/Master.scala    | 37 ++++++++++++-------
 3 files changed, 34 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 31d48b82b9..4c81a1b447 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
   }
 
   def coresLeft: Int = desc.cores - coresGranted
+
+  private var _retryCount = 0
+
+  def retryCount = _retryCount
+
+  def incrementRetryCount = {
+    _retryCount += 1
+    _retryCount
+  }
 }
diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala
index 50b0c6f95b..8d458ac39c 100644
--- a/core/src/main/scala/spark/deploy/master/JobState.scala
+++ b/core/src/main/scala/spark/deploy/master/JobState.scala
@@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED")
   type JobState = Value
 
   val WAITING, RUNNING, FINISHED, FAILED = Value
+
+  val MAX_NUM_RETRY = 10
 }
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c98dddea7b..5cc73633ab 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -1,19 +1,18 @@
 package spark.deploy.master
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
 import akka.actor._
-import spark.{Logging, Utils}
-import spark.util.AkkaUtils
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+
 import java.text.SimpleDateFormat
 import java.util.Date
-import akka.remote.RemoteClientLifeCycleEvent
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
 import spark.deploy._
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
-import spark.deploy.RegisterWorker
-import spark.deploy.RegisterWorkerFailed
-import akka.actor.Terminated
+import spark.{Logging, SparkException, Utils}
+import spark.util.AkkaUtils
+
 
 class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For job IDs
@@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
           exec.state = state
           exec.job.actor ! ExecutorUpdated(execId, state, message)
           if (ExecutorState.isFinished(state)) {
+            val jobInfo = idToJob(jobId)
             // Remove this executor from the worker and job
             logInfo("Removing executor " + exec.fullId + " because it is " + state)
-            idToJob(jobId).removeExecutor(exec)
+            jobInfo.removeExecutor(exec)
             exec.worker.removeExecutor(exec)
-            // TODO: the worker would probably want to restart the executor a few times
-            schedule()
+
+            // Only retry certain number of times so we don't go into an infinite loop.
+            if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+              schedule()
+            } else {
+              val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+                jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
+              logError(e.getMessage, e)
+              throw e
+              //System.exit(1)
+            }
           }
         }
         case None =>
@@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
       addressToWorker.get(address).foreach(removeWorker)
       addressToJob.get(address).foreach(removeJob)
     }
-    
+
     case RequestMasterState => {
       sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
     }
-- 
GitLab