diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala index be58957806b8a90e6f5b910ca4aa47a9cd1c1f01..6075019b8d077be18fefb5f5101eefb7afdd0ab8 100644 --- a/src/scala/spark/MesosScheduler.scala +++ b/src/scala/spark/MesosScheduler.scala @@ -299,9 +299,11 @@ extends ParallelOperation with Logging def taskFinished(status: TaskStatus) { val tid = status.getTaskId - logInfo("Finished opId " + opId + " TID " + tid) val index = tidToIndex(tid) if (!finished(index)) { + tasksFinished += 1 + logInfo("Finished opId %d TID %d (progress: %d/%d)".format( + opId, tid, tasksFinished, numTasks)) // Deserialize task result val result = Utils.deserialize[TaskResult[T]](status.getData) results(index) = result.value @@ -311,25 +313,25 @@ extends ParallelOperation with Logging finished(index) = true // Remove TID -> opId mapping from sched sched.taskIdToOpId.remove(tid) - tasksFinished += 1 - logInfo("Progress: " + tasksFinished + "/" + numTasks) if (tasksFinished == numTasks) setAllFinished() } else { - logInfo("Task " + index + " has already finished, so ignoring it") + logInfo("Ignoring task-finished event for TID " + tid + + " because task " + index + " is already finished") } } def taskLost(status: TaskStatus) { val tid = status.getTaskId - logInfo("Lost opId " + opId + " TID " + tid) val index = tidToIndex(tid) if (!finished(index)) { + logInfo("Lost opId " + opId + " TID " + tid) launched(index) = false sched.taskIdToOpId.remove(tid) tasksLaunched -= 1 } else { - logInfo("Task " + index + " has already finished, so ignoring it") + logInfo("Ignoring task-lost event for TID " + tid + + " because task " + index + " is already finished") } }