Skip to content
Snippets Groups Projects
Commit 30090884 authored by Kay Ousterhout's avatar Kay Ousterhout
Browse files

[SPARK-8880] Fix confusing Stage.attemptId member variable

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #7275 from kayousterhout/SPARK-8880 and squashes the following commits:

3e9ce7c [Kay Ousterhout] Added missing return type
e150278 [Kay Ousterhout] [SPARK-8880] Fix confusing Stage.attemptId member variable
parent c472eb17
No related branches found
No related tags found
No related merge requests found
......@@ -872,7 +872,7 @@ class DAGScheduler(
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
stage.makeNewStageAttempt(partitionsToCompute.size)
outputCommitCoordinator.stageStart(stage.id)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
......@@ -937,8 +937,8 @@ class DAGScheduler(
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
......
......@@ -62,22 +62,28 @@ private[spark] abstract class Stage(
var pendingTasks = new HashSet[Task[_]]
/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
val name = callSite.shortForm
val details = callSite.longForm
/** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
var latestInfo: StageInfo = StageInfo.fromStage(this)
/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
*/
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
/** Return a new attempt id, starting with 0. */
def newAttemptId(): Int = {
val id = nextAttemptId
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(numPartitionsToCompute: Int): Unit = {
_latestInfo = StageInfo.fromStage(this, nextAttemptId, Some(numPartitionsToCompute))
nextAttemptId += 1
id
}
def attemptId: Int = nextAttemptId
/** Returns the StageInfo for the most recent attempt for this stage. */
def latestInfo: StageInfo = _latestInfo
override final def hashCode(): Int = id
override final def equals(other: Any): Boolean = other match {
......
......@@ -70,12 +70,12 @@ private[spark] object StageInfo {
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
def fromStage(stage: Stage, attemptId: Int, numTasks: Option[Int] = None): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(
stage.id,
stage.attemptId,
attemptId,
stage.name,
numTasks.getOrElse(stage.numTasks),
rddInfos,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment