diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 838179c6b582ef56642e7e59a0eb9ab636a35643..2a2f828be69676986a188a2a120305ba6a57a611 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -60,7 +60,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont taskSets += taskSet } override def cancelTasks(stageId: Int) {} - override def setListener(listener: TaskSchedulerListener) = {} + override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index 80d0c5a5e929ac2c62c2eef41e70f75b9789d841..b97f2b19b581c3aa1f82899905bf564b5177afad 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -28,6 +28,30 @@ import org.apache.spark.executor.TaskMetrics import java.nio.ByteBuffer import org.apache.spark.util.{Utils, FakeClock} +class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) { + taskScheduler.startedTasks += taskInfo.index + } + + override def taskEnded( + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: mutable.Map[Long, Any], + taskInfo: TaskInfo, + taskMetrics: TaskMetrics) { + taskScheduler.endedTasks(taskInfo.index) = reason + } + + override def executorGained(execId: String, host: String) {} + + override def executorLost(execId: String) {} + + override def taskSetFailed(taskSet: TaskSet, reason: String) { + taskScheduler.taskSetsFailed += taskSet.id + } +} + /** * A mock ClusterScheduler implementation that just remembers information about tasks started and * feedback received from the TaskSetManagers. Note that it's important to initialize this with @@ -44,30 +68,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* val executors = new mutable.HashMap[String, String] ++ liveExecutors - listener = new TaskSchedulerListener { - def taskStarted(task: Task[_], taskInfo: TaskInfo) { - startedTasks += taskInfo.index - } - - def taskEnded( - task: Task[_], - reason: TaskEndReason, - result: Any, - accumUpdates: mutable.Map[Long, Any], - taskInfo: TaskInfo, - taskMetrics: TaskMetrics) - { - endedTasks(taskInfo.index) = reason - } - - def executorGained(execId: String, host: String) {} - - def executorLost(execId: String) {} - - def taskSetFailed(taskSet: TaskSet, reason: String) { - taskSetsFailed += taskSet.id - } - } + dagScheduler = new FakeDAGScheduler(this) def removeExecutor(execId: String): Unit = executors -= execId